﻿using iTool.Common.Options;
using Orleans;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace iTool.ClusterComponent
{
    public class StreamQueueGrain : Orleans.Grain, IStreamQueueGrain
    {
        //private readonly Queue<MessageData> eventQueue = new Queue<MessageData>();
        //private long sequenceNumber = DateTime.UtcNow.Ticks;

        IQueueStorageProvider queueStorageProvider;
        AdoNetOptions options;
        public StreamQueueGrain(IQueueStorageProvider queueStorageProvider, AdoNetOptions options)
        {
            this.queueStorageProvider = queueStorageProvider;
            this.options = options;
        }

        public override Task OnActivateAsync() 
        {
            this.GetPrimaryKey(out string queueId);
            queueId = queueId.Replace("-", "_");
            this.queueStorageProvider = queueStorageProvider.CreateInstance(this.options, queueId);
            return Task.CompletedTask;
        }

        public async Task Enqueue(MessageData data)
        {
            //data.SequenceNumber = sequenceNumber++;
            //eventQueue.Enqueue(data);
            //await Task.CompletedTask;
            //return;


            var service = this.GrainFactory.GetGrain<IStreamSequenceNumber>(0);
            data.SequenceNumber = await service.GetSequenceNumber();
            //eventQueue.Enqueue(data);
            await this.queueStorageProvider.EnqueueAsync(data);
        }

        public async Task<IEnumerable<MessageData>> Dequeue(int maxCount)
        {

            //List<MessageData> list = new List<MessageData>();

            //for (int i = 0; i < maxCount && eventQueue.Count > 0; ++i)
            //{
            //    list.Add(eventQueue.Dequeue());
            //}

            //return await Task.FromResult(list);

            //var service = this.GrainFactory.GetGrain<IStreamQueueOffset>(string.Empty);
            var service = this.GrainFactory.GetGrain<IStreamQueueOffset>(Guid.Empty, "Aggregation");
            long offset = await service.GetNextOffset();

            var result = await this.queueStorageProvider.DequeueAsync(offset, maxCount);

            if (result.Count() > 0)
            {
                long maxOffset = result.Max(item => item.SequenceNumber);
                await service.SetNextStartOffset(maxOffset + 1);
            }

            return result;

        }
    }
}
